package cn.pengpeng.day06.avg;

import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;


public class Avg {
	public static class MapTask extends Mapper<LongWritable, Text, Text, IntWritable>{
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			ObjectMapper objectMapper = new ObjectMapper();
			MovieBean bean = objectMapper.readValue(value.toString(), MovieBean.class);
			String movie = bean.getMovie();
			int rate = bean.getRate();
			context.write(new Text(movie), new IntWritable(rate));
		}
	}
	
	public static class ReduceTask extends Reducer<Text, IntWritable, Text, FloatWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, FloatWritable>.Context context) throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			for (IntWritable rate : values) {
				sum+=rate.get();
				count++;
			}
			float avg = sum*1.0f/count;
			context.write(key, new FloatWritable(avg));
		}
	}
	
	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf, "avg");
		
		//设置map和reduce，以及提交的jar
		job.setMapperClass(MapTask.class);
		job.setReducerClass(ReduceTask.class);
		job.setJarByClass(Avg.class);
		
		//设置输入输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FloatWritable.class);
		
		//输入和输出目录
		FileInputFormat.addInputPath(job, new Path("d:/data/rating.json"));
		FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\avg"));
		
		//判断文件是否存在
		File file = new File("d:\\data\\out\\avg");
		if(file.exists()){
			FileUtils.deleteDirectory(file);
		}
		
		//提交任务
		boolean completion = job.waitForCompletion(true);
		System.out.println(completion?"你很优秀！！！":"滚去调bug！！");
	}

}
